package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameUtil;
import io.rsocket.internal.ClientServerInputMultiplexer;
import io.rsocket.plugins.DuplexConnectionInterceptor;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: classes4.dex */
public class ClientServerInputMultiplexer implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
    private static final InitializingInterceptorRegistry emptyInterceptorRegistry = new InitializingInterceptorRegistry();
    private final DuplexConnection clientConnection;
    private final DuplexConnection clientServerConnection;
    private final DuplexConnection serverConnection;
    private final DuplexConnection setupConnection;
    private boolean setupReceived;
    private final DuplexConnection source;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes4.dex */
    public static class InternalDuplexConnection implements DuplexConnection {
        private final boolean debugEnabled = ClientServerInputMultiplexer.LOGGER.isDebugEnabled();
        private final MonoProcessor<Flux<ByteBuf>>[] processors;
        private final DuplexConnection source;

        @SafeVarargs
        public InternalDuplexConnection(DuplexConnection duplexConnection, MonoProcessor<Flux<ByteBuf>>... monoProcessorArr) {
            this.source = duplexConnection;
            this.processors = monoProcessorArr;
        }

        public static /* synthetic */ Publisher lambda$null$2(InternalDuplexConnection internalDuplexConnection, Flux flux) {
            return internalDuplexConnection.debugEnabled ? flux.doOnNext(new Consumer() { // from class: io.rsocket.internal.-$$Lambda$ClientServerInputMultiplexer$InternalDuplexConnection$SeCBrLt5C4uCi8pO68RWepFTtf4
                @Override // java.util.function.Consumer
                public final void accept(Object obj) {
                    ClientServerInputMultiplexer.LOGGER.debug("receiving -> " + FrameUtil.toString((ByteBuf) obj));
                }
            }) : flux;
        }

        @Override // io.rsocket.DuplexConnection
        public ByteBufAllocator alloc() {
            return this.source.alloc();
        }

        @Override // io.rsocket.DuplexConnection, io.rsocket.Availability
        public double availability() {
            return this.source.availability();
        }

        @Override // reactor.core.Disposable
        public void dispose() {
            this.source.dispose();
        }

        @Override // reactor.core.Disposable
        public boolean isDisposed() {
            return this.source.isDisposed();
        }

        @Override // io.rsocket.Closeable
        public Mono<Void> onClose() {
            return this.source.onClose();
        }

        @Override // io.rsocket.DuplexConnection
        public Flux<ByteBuf> receive() {
            return Flux.fromArray(this.processors).flatMap(new Function() { // from class: io.rsocket.internal.-$$Lambda$ClientServerInputMultiplexer$InternalDuplexConnection$CJ9XeNTQX-8-bHo1ndDBSoeQSBY
                @Override // java.util.function.Function
                public final Object apply(Object obj) {
                    Publisher flatMapMany;
                    flatMapMany = ((MonoProcessor) obj).flatMapMany(new Function() { // from class: io.rsocket.internal.-$$Lambda$ClientServerInputMultiplexer$InternalDuplexConnection$vhqc_dXTSgHwIgYXi7FxTGCbS-4
                        @Override // java.util.function.Function
                        public final Object apply(Object obj2) {
                            return ClientServerInputMultiplexer.InternalDuplexConnection.lambda$null$2(ClientServerInputMultiplexer.InternalDuplexConnection.this, (Flux) obj2);
                        }
                    });
                    return flatMapMany;
                }
            });
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> send(Publisher<ByteBuf> publisher) {
            if (this.debugEnabled) {
                publisher = Flux.from(publisher).doOnNext(new Consumer() { // from class: io.rsocket.internal.-$$Lambda$ClientServerInputMultiplexer$InternalDuplexConnection$SvX587J_H1pdM0OV5d_lUf9J_G8
                    @Override // java.util.function.Consumer
                    public final void accept(Object obj) {
                        ClientServerInputMultiplexer.LOGGER.debug("sending -> " + FrameUtil.toString((ByteBuf) obj));
                    }
                });
            }
            return this.source.send(publisher);
        }

        @Override // io.rsocket.DuplexConnection
        public Mono<Void> sendOne(ByteBuf byteBuf) {
            if (this.debugEnabled) {
                ClientServerInputMultiplexer.LOGGER.debug("sending -> " + FrameUtil.toString(byteBuf));
            }
            return this.source.sendOne(byteBuf);
        }
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection) {
        this(duplexConnection, emptyInterceptorRegistry, false);
    }

    public ClientServerInputMultiplexer(DuplexConnection duplexConnection, InitializingInterceptorRegistry initializingInterceptorRegistry, final boolean z) {
        this.source = duplexConnection;
        final MonoProcessor create = MonoProcessor.create();
        final MonoProcessor create2 = MonoProcessor.create();
        final MonoProcessor create3 = MonoProcessor.create();
        DuplexConnection initConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.SOURCE, duplexConnection);
        this.setupConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.SETUP, new InternalDuplexConnection(initConnection, create));
        this.serverConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.SERVER, new InternalDuplexConnection(initConnection, create2));
        this.clientConnection = initializingInterceptorRegistry.initConnection(DuplexConnectionInterceptor.Type.CLIENT, new InternalDuplexConnection(initConnection, create3));
        this.clientServerConnection = new InternalDuplexConnection(initConnection, create3, create2);
        initConnection.receive().groupBy(new Function() { // from class: io.rsocket.internal.-$$Lambda$ClientServerInputMultiplexer$SDuoNIVRO7j47NrhclWuZI4leLQ
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ClientServerInputMultiplexer.lambda$new$0(ClientServerInputMultiplexer.this, z, (ByteBuf) obj);
            }
        }).subscribe(new Consumer() { // from class: io.rsocket.internal.-$$Lambda$ClientServerInputMultiplexer$toZCGgicXlsMf32pVFuNvSDJnTA
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ClientServerInputMultiplexer.lambda$new$1(MonoProcessor.this, create2, create3, (GroupedFlux) obj);
            }
        }, new Consumer() { // from class: io.rsocket.internal.-$$Lambda$ClientServerInputMultiplexer$SIBEJIUC2HJ-RFLmS9ZgRFRhaAw
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ClientServerInputMultiplexer.lambda$new$2(MonoProcessor.this, create2, create3, (Throwable) obj);
            }
        });
    }

    public static /* synthetic */ DuplexConnectionInterceptor.Type lambda$new$0(ClientServerInputMultiplexer clientServerInputMultiplexer, boolean z, ByteBuf byteBuf) {
        DuplexConnectionInterceptor.Type type;
        int streamId = FrameHeaderCodec.streamId(byteBuf);
        if (streamId == 0) {
            switch (FrameHeaderCodec.frameType(byteBuf)) {
                case SETUP:
                case RESUME:
                case RESUME_OK:
                    type = DuplexConnectionInterceptor.Type.SETUP;
                    clientServerInputMultiplexer.setupReceived = true;
                    break;
                case LEASE:
                case KEEPALIVE:
                case ERROR:
                    if (!z) {
                        type = DuplexConnectionInterceptor.Type.SERVER;
                        break;
                    } else {
                        type = DuplexConnectionInterceptor.Type.CLIENT;
                        break;
                    }
                default:
                    if (!z) {
                        type = DuplexConnectionInterceptor.Type.CLIENT;
                        break;
                    } else {
                        type = DuplexConnectionInterceptor.Type.SERVER;
                        break;
                    }
            }
        } else {
            type = (streamId & 1) == 0 ? DuplexConnectionInterceptor.Type.SERVER : DuplexConnectionInterceptor.Type.CLIENT;
        }
        if (z || type == DuplexConnectionInterceptor.Type.SETUP || clientServerInputMultiplexer.setupReceived) {
            return type;
        }
        byteBuf.release();
        throw new IllegalStateException("SETUP or LEASE frame must be received before any others.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void lambda$new$1(MonoProcessor monoProcessor, MonoProcessor monoProcessor2, MonoProcessor monoProcessor3, GroupedFlux groupedFlux) {
        switch ((DuplexConnectionInterceptor.Type) groupedFlux.key()) {
            case SETUP:
                monoProcessor.onNext(groupedFlux);
                return;
            case SERVER:
                monoProcessor2.onNext(groupedFlux);
                return;
            case CLIENT:
                monoProcessor3.onNext(groupedFlux);
                return;
            default:
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void lambda$new$2(MonoProcessor monoProcessor, MonoProcessor monoProcessor2, MonoProcessor monoProcessor3, Throwable th) {
        monoProcessor.onError(th);
        monoProcessor2.onError(th);
        monoProcessor3.onError(th);
    }

    public DuplexConnection asClientConnection() {
        return this.clientConnection;
    }

    public DuplexConnection asClientServerConnection() {
        return this.clientServerConnection;
    }

    public DuplexConnection asServerConnection() {
        return this.serverConnection;
    }

    public DuplexConnection asSetupConnection() {
        return this.setupConnection;
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.source.dispose();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.source.isDisposed();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.source.onClose();
    }
}
